傳統的SaaS稱為Software as a Service,或許我們今天要build的可以更狹義地稱為Solver as a Service,其特色為:
稍等一下,根據[Day08]的精進大綱,沒有今天這個主題呀。
沒錯!其實我們猶豫了一陣子該不該講這個主題呢?不是想藏私,是真正production的環境裡,我們有自建的前端、後端、資料庫、ANSA的前處理節點及LS-DYNA的求解節點來搭配Prefect Cloud
。該如何有個簡單的範例,來講解如何搭配Prefect Cloud
,著實讓我們失眠了幾晚。
幾經思量,我們訂出目標是使用[Day20]部屬於Streamlit Cloud
的App
做為前端,於前端上傳input_data.json
後,透過Prefect CLoud
自動下載input_data.json
及產生box_drop.k
,並自動呼叫LS-DYNA進行求解。
今天的code是由我們自建的Solver as a Service抽取出來,做了大量的簡化,目標是做最基本的概念分享,離實務有點距離。
實務上需要考慮各種可能的情況,諸如:
config上
傳到Linode Object Storage。config
下載至ANSA前處理節點(ex:網路偶而不順時,需設定retry次數)。Prefect
可以視為一種自動化處理資料的工具。只需要對原有的code進行少許修改,就可以快速地串接為自動化的流程。我們也曾經用過Apache Airflow,除了需要寫較多的code外,處理DAGs
有時也會遇到滿棘手的問題。Prefect 2.0
後改進及添加許多功能,而且有了全新的UI,所以我們決定試試換到Prefect
,現在看來成效也還不錯。
Prefect Cloud
是一種orchestration as a service
,需要獲取API KEY
才能進使用其服務。getting-started文件可以找到相關的資訊。
下面簡單說明今天會用到的Prefect
功能:
flow
是Prefect
用來區別不同工作的decorator
。當一個function
被@flow
後,prefect
會知道這是此工作的入口,而flow
包在flow
內作為subflow
及@task
可以幫助我們將工作做更細的規劃。ConcurrentTaskRunner
,但如果我們想要讓工作是有序的執行,則需選擇SequentialTaskRunner
。因為我們的範例只預設一個concurrent job
,所以選哪個應該都可以。package
,現在好像只有一個名為shell_run_command
的function
。此function
已經@task
可以用來執行shell command
。此處我們hardcode四個變數名:
jfile
為下載的json config
本機端檔案路徑。kfile
為ANSA寫出的box_drop.k
本機端檔案路徑。call_ansa_py
為box_drop.py
本機端檔案路徑。s3_url
為透過Streamlit
上傳的遠端config
檔案路徑。由於box drop project有用到numpy
及scipy
,所以這邊需要做點處理。如果是於ANSA的GUI環境下,我們可以直接import numpy
及scipy
使用,但是此處我們是準備透過Prefect
藉由command line來呼叫ANSA,此時當前的Python環境為conda
或自建的venv
等虛擬環境,在ANSA被呼叫之後,是沒有辦法自動引入其附的五個third-party package
。我們的辦法是透過先import sys module
,然後透過sys.path.append
將numpy
及scipy
的所在路徑加入搜尋路徑中。
改寫box_drop.py
中的main function
:
jfile
參數 ,並利用json
讀取此檔案,作為config
。run_dyna
移出此main function
,示範如何建立task
。if __name__ == '__main__':
,原因有二:
main flow
中下載config
。subprocess
直接呼叫box_drop
的main function
。#box_drop.py
import json
import sys
from pathlib import Path
sys.path.append(f'{Path.home()}/BETA_CAE_Systems/shared_v23.0.0/python/linux64/lib/python3.8/site-packages/numpy-1.21.3-py3.8-linux-x86_64.egg')
sys.path.append(f'{Path.home()}/BETA_CAE_Systems/shared_v23.0.0/python/linux64/lib/python3.8/site-packages/scipy-1.7.1-py3.8-linux-x86_64.egg')
def main(jfile):
with open(jfile) as f:
config = json.load(f)
......
#remove run_dyna(file_str)
#remove lines after if __name__ == '__main__':
pull_to_trigger
並以@flow(task_runner=SequentialTaskRunner())
裝飾,作為Prefect
的入口。
urllib.request
下載遠端的config
檔。jfile
是否已經存在本機,如果存在就比較remote
及local
config
檔的MD5 hash
值是否相同,更新is_triggered
。jfile
不存在本機或is_triggered
為真的話,就使用遠端的config
覆蓋寫入jfile
。接著呼叫call_ansa
,並於呼叫完畢後,透過檢查kfile
是否存在,來判斷是否呼叫run_dyna
。#pfct.py
@flow(task_runner=SequentialTaskRunner())
def pull_to_trigger():
ssl._create_default_https_context = ssl._create_unverified_context
is_triggered, is_file = False, Path(jfile).is_file()
with urllib.request.urlopen(s3_url) as resp:
remote_config = resp.read()
if is_file:
with open(jfile, 'rb') as frb:
remote_config_md5 = hashlib.md5(remote_config).hexdigest()
local_config_md5 = hashlib.md5(frb.read()).hexdigest()
is_triggered = remote_config_md5 != local_config_md5
if not is_file or is_triggered:
new_config = json.loads(remote_config.decode("utf-8"))
with open(jfile, 'w') as fw:
json.dump(new_config, fw)
call_ansa(jfile)
if Path(kfile).is_file():
run_dyna()
call_ansa
主要參考Interacting with ANSA
的Running Scripts without GUI
。
下面這段code的白話意思是,在不打開ANSA GUI的情況下,呼叫ANSA執行call_ansa_py
這個檔案中的main function
,並把jfile
作為參數傳給main
。
可能您會問,為什麼不使用shell_run_command
呢?很遺憾,我們試過很多種方式,總是沒辦法呼叫成功,所以只能使用原生的subprocess
。
最後請留意,我們使用了@task
來宣告這是一個task
。
#pfct.py
@task
def call_ansa(jfile):
command = ['ansa',
'-exec',
f"load_script: '{call_ansa_py}'",
'-exec',
f"main('{jfile}')",
'-nogui']
return subprocess.run(command)
run_dyna
使用shell_run_command
呼叫求解的指令。需要注意的是,因為我們是透過prefect
來呼叫LS-DYNA
,所以需要處理路徑問題,要先cd
到當前目錄。
因為prefect
認為@task
內不能再有@task
,所以我們不能將run_dyna
加上@task
,原因是shell_run_command
內已經有@task
了。
但@flow
內可以再有@flow
,所以我們可以給run_dyna
加上@flow
作為的subflow
。
#pfct.py
@flow
def run_dyna():
run_kfile = Path(kfile)
dyna_dir_str = (run_kfile.parent).as_posix()
solver = f'{Path.home()}/LS-DYNA/13.0/smp-dyna_s'
i = run_kfile.as_posix()
command = f'cd {dyna_dir_str};\
{solver} i={i} ncpu=8 memory=1024m d=nodump'
return shell_run_command(command=command, return_all=True)
deploy_pfct.sh
為我們透過指令部署code至Prefect cloud
的script,詳細的說明可以參考官方文件。
prefect work-queue create day21 -l 1
為建立一個concurrent_limit
為1
的work-queue
。prefect deployment build pfct.py:pull_to_trigger -n pull_to_trigger -t pull_to_trigger -q day21
為設定使用day21
這個work-queue
來搭配pfct.pull_to_trigger
這個flow
。並且命名此deployment
為pull_to_trigger
而tag
亦為pull_to_trigger
。prefect deployment apply pull_to_trigger-deployment.yaml
為真正使用pull_to_trigger-deployment.yaml
檔案進行部署。prefect agent start -q 'day21'
為真正啟動work-queue
,準備迎接傳入的flow
。#deploy_pfct.sh
#!/usr/bin/bash
prefect work-queue create day21 -l 1
prefect deployment build pfct.py:pull_to_trigger -n pull_to_trigger -t pull_to_trigger -q day21
prefect deployment apply pull_to_trigger-deployment.yaml
prefect agent start -q 'day21'
接著透過sh deploy_pfct.sh
即可完成部署。
我們可以看到兩個flow
都成功地執行。
pull-to-trigger flow
的詳細log。
run-dyna flow
的詳細log。
於Prefect Cloud
的Deployments
面板裡,有調整scheduling
的選項。這樣一來,您可以設定一個合理的檢查間距,自動去確認是否有flow
需要執行。
請注意因為我們concurrent_limit
是1
,所以當您的檢查間距過密時,flow
可能還沒執行完畢,最後的結果會是不斷延遲並不斷累積flow
。
解決的方法之一是不要有hardcode的jfile
、kfile
甚至s3_url
,至於詳細怎麼做...請留一點給我們賺吧XD
或許明年鐵人賽我們再繼續深入討論?
box drop project至此告一段落。希望諸位收獲滿滿,也看出我們是很認真地在設計分享的內容。如果覺得這些文章對您有幫助的話,可以幫我們點個讚或留言支持我們,感恩!
明天讓我們暫時離開ANSA,利用Streamlit
來建立一個job submitter project,使其可在Windows WSL2
下搭配LS-DYNA small system運行。